Apache Kafka — это платформа для обработки и передачи данных в реальном времени. Одной из ключевых концепций в Kafka является возможность объединения потоков данных. Объединение потоков данных в Kafka — это процесс объединения и обработки данных из различных топиков (потоков) с целью создания нового потока данных или выполнения какой-либо аналитики. Это позволяет эффективно управлять потоками данных и обеспечивать гибкость в архитектуре обработки данных, а также агрегировать, обрабатывать и анализировать данные из различных источников и направлять их в нужные места.
Объединение потоков: несколько практических примеров
Для работы с потоками в Kafka используется класс KStream библиотеки Kafka Streams. В следующем примере мы объединим данные из двух топиков — topicA и topicB — и направим их в новый топик mergedTopic:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MergeTopicsExample {
public static void main(String[] args) {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "merge-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList("topicA", "topicB"));
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
ProducerRecord<String, String> newRecord = new ProducerRecord<>("mergedTopic", record.key(), record.value());
producer.send(newRecord);
}
}
}
}
В данном фрагменте кода мы создаем Kafka Consumer, подписываем его на topicA и topicB, затем создаем Kafka Producer и отправляем данные в mergedTopic.
В следующем примере мы объединим данные из топика userActions и выполним агрегацию суммы действий по каждому пользователю, направляя результат в userActionsAggregated:
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
public class AggregateStreamsExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregate-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> userActionsStream = builder.stream("userActions");
KStream<String, String> aggregatedStream = userActionsStream
.groupByKey()
.aggregate(
() -> "0", // Initial value for aggregation
(key, value, aggregate) -> String.valueOf(Integer.parseInt(aggregate) + 1),
Materialized.as("userActionsAggregated")
).toStream();
aggregatedStream.to("userActionsAggregated", Produced.with(Serdes.String(), Serdes.String()));
Topology topology = builder.build();
}
}
В данном примере мы используем Kafka Streams API для создания агрегированного потока данных. Мы группируем данные по ключу (пользователю) и применяем агрегатор, который увеличивает счетчик действий пользователя. Результат агрегации направляется в userActionsAggregated топик.
Таким образом, объединение потоков данных в Apache Kafka является мощным инструментом для агрегации, анализа и перенаправления данных в реальном времени. Это позволяет системе Kafka обеспечивать высокую доступность, надежность и масштабируемость при обработке данных в реальном времени.
Это делает Apache Kafka надежным и универсальным средством для хранения и обмена большими потоками данных, что активно применяется в задачах Data Science и разработке распределенных приложений.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники
- https://kafka.apache.org/documentation/
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных



